package discover

import (
	"context"
	"fmt"
	"gitee.com/zhancaihua/goyt/core/lang"
	"gitee.com/zhancaihua/goyt/core/logyt"
	"gitee.com/zhancaihua/goyt/core/proc"
	"gitee.com/zhancaihua/goyt/core/threading"
	clientv3 "go.etcd.io/etcd/client/v3"
	"google.golang.org/grpc"
	"io"
	"sort"
	"strings"
	"sync"
	"time"
)

var (
	//全局变量，用于安全发布
	connManager = threading.NewResourceManager()
	//全局变量，用于安全发布
	registry = Registry{
		clusters: make(map[string]*cluster),
	}
)

// A Registry 主要用于管理cluster。当其他模块想要使用cluster的时候，可以从register获得.getClusterKey(endpoints) id映射
type Registry struct {
	clusters map[string]*cluster
	lock     sync.Mutex
}

// GetRegistry returns a global Registry.
func GetRegistry() *Registry {
	return &registry
}

// getCluster 从里面获得一个cluster，没有就新建一个并自动注册进去
func (r *Registry) getCluster(endpoints []string) (c *cluster, exists bool) {
	clusterKey := getClusterKey(endpoints)
	r.lock.Lock()
	defer r.lock.Unlock()
	c, exists = r.clusters[clusterKey]
	if !exists {
		c = newCluster(endpoints)
		r.clusters[clusterKey] = c
	}

	return
}

// GetConn 获得一个cluster对应的client v3
func (r *Registry) GetConn(endpoints []string) (EtcdClient, error) {
	c, _ := r.getCluster(endpoints)
	return c.getClient()
}

func getClusterKey(endpoints []string) string {
	sort.Strings(endpoints)
	return strings.Join(endpoints, endpointsSeparator)
}

// EtcdClient interface represents an etcd client.
type EtcdClient interface {
	ActiveConnection() *grpc.ClientConn
	Close() error
	Ctx() context.Context
	Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error)
	Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)
	Grant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error)
	KeepAlive(ctx context.Context, id clientv3.LeaseID) (<-chan *clientv3.LeaseKeepAliveResponse, error)
	Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error)
	Revoke(ctx context.Context, id clientv3.LeaseID) (*clientv3.LeaseRevokeResponse, error)
	Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan
}

// cluster 用于管理endpoints,以及其唯一标志key。 并管理其对应的clientv3 实例
type cluster struct {
	endpoints []string
	key       string
	lock      sync.Mutex
}

func newCluster(endpoints []string) *cluster {
	return &cluster{
		endpoints: endpoints,
		key:       getClusterKey(endpoints),
	}
}
func (c *cluster) getClient() (EtcdClient, error) {
	val, err := connManager.GetResource(c.key, func() (io.Closer, error) {
		return c.newClient()
	})
	if err != nil {
		return nil, err
	}
	//TODO 监控etcd 连接的状态
	return val.(EtcdClient), nil
}
func (c *cluster) newClient() (EtcdClient, error) {
	cli, err := NewClient(c.endpoints)
	if err != nil {
		return nil, err
	}
	return cli, nil
}

// Publisher 主要用于将服务端注册到etcd。其管理了集群地址，命名空间key，服务id，以及要注册的值，租约等信息
// fullKey是命名空间 key/id或者key/lease_id 主要看有没有设置id
type Publisher struct {
	endpoints  []string
	key        string
	fullKey    string
	id         int64
	value      string
	lease      clientv3.LeaseID
	ctx        context.Context
	cancel     func()
	pauseChan  chan lang.PlaceholderType
	resumeChan chan lang.PlaceholderType
	wg         sync.WaitGroup
}
type PubOption func(client *Publisher)

// WithId customizes a Publisher with the id.
func WithId(id int64) PubOption {
	return func(publisher *Publisher) {
		publisher.id = id
	}
}

// WithPubEtcdAccount provides the etcd username/password.
func WithPubEtcdAccount(user, pass string) PubOption {
	return func(pub *Publisher) {
		AddAccount(pub.endpoints, user, pass)
	}
}

func NewPublisher(endpoints []string, key, value string, opts ...PubOption) *Publisher {
	ctx, cancel := context.WithCancel(context.Background())
	publisher := &Publisher{
		endpoints:  endpoints,
		key:        key,
		value:      value,
		ctx:        ctx,
		cancel:     cancel,
		pauseChan:  make(chan lang.PlaceholderType),
		resumeChan: make(chan lang.PlaceholderType),
	}

	for _, opt := range opts {
		opt(publisher)
	}

	return publisher
}

// KeepAlive 注册并后台续约
func (p *Publisher) KeepAlive() error {
	// 先注册下，看看etcd可不可用
	cli, err := p.doRegister()
	if err != nil {
		return err
	}

	proc.AddWrapUpListener(func() {
		p.Stop()
	})

	return p.keepAliveAsync(cli)
}
func (p *Publisher) doKeepalive() error {
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	for range ticker.C {
		select {
		//这里p.ctx才用于控制协程退出
		case <-p.ctx.Done():
			return nil
		default:
			cli, err := p.doRegister()
			if err != nil {
				logyt.WarnLog("etcd keepalive retry failed", logyt.LogField{
					Key:   "error",
					Value: err,
				})
				break
			}

			if err := p.keepAliveAsync(cli); err != nil {
				logyt.WarnLog("etcd keepalive retry failed", logyt.LogField{
					Key:   "error",
					Value: err,
				})
				break
			}

			return nil
		}
	}

	return nil
}

// Stop stops the renewing and revokes the registration.
func (p *Publisher) Stop() {
	p.cancel()
	p.wg.Wait()
}
func (p *Publisher) keepAliveAsync(client EtcdClient) error {
	ch, err := client.KeepAlive(p.ctx, p.lease)
	if nil != err {
		return err
	}
	p.wg.Add(1)
	threading.GoSafe(func() {
		defer p.wg.Done()
		for {
			select {
			case _, ok := <-ch:
				if !ok {
					logyt.WarnLog("etcd keepalive receive chan closed")
					p.revoke(client) //先revoke掉然后搞个新租约继续,事实上到最后只有一个携程在跑续约，我们把当前协程结束，起新携程续约即可。【能在新协程干就不要太多的操作当前协程】
					logyt.WarnLog("etcd keepalive lease revoked")
					if err := p.doKeepalive(); nil != err {
						logyt.WarnLog("etcd keepalive retry failed", logyt.LogField{
							Key:   "error",
							Value: err,
						})
					}
					return //退出当前协程
				}
				//这里p.ctx才用于控制协程退出
			case <-p.ctx.Done():
				p.revoke(client)
				logyt.InfoLog("etcd keepalive lease revoked")
				return
			}
		}
	})
	return nil
}
func (p *Publisher) revoke(client EtcdClient) {
	// Revoke 不能使用p带的ctx，因为一般都是检测到p.ctx关闭后才Revoke的，此时如果传p.ctx则将不成功
	// 使用client自带的ctx可以保证client关闭之后此方法解除阻塞
	if _, err := client.Revoke(client.Ctx(), p.lease); nil != err {
		logyt.WarnLog("etcd keepalive revoke failed")
	}
}

// doRegister 获取clientv3 并创建租约把值注册到etcd
func (p *Publisher) doRegister() (EtcdClient, error) {
	//从全局资源管理器拿一个clientv3过来,注意这里的代码必须从Registry中拿，避免注册中心重启后再也无法重新watch
	cli, err := GetRegistry().GetConn(p.endpoints)
	if err != nil {
		return nil, err
	}

	p.lease, err = p.register(cli)
	return cli, err
}

// register 创建租约并把值注册到etcd
func (p *Publisher) register(client EtcdClient) (clientv3.LeaseID, error) {
	//这里直接使用ctx主要是为了在clientV3关闭的时候这边也能同步接受到
	resp, err := client.Grant(client.Ctx(), TimeToLive)
	if err != nil {
		return clientv3.NoLease, err
	}

	//下面开始判断用id或者租约id作为当前服务id与key组成一个fullKey
	lease := resp.ID
	if p.id > 0 {
		p.fullKey = makeEtcdKey(p.key, p.id)
	} else {
		p.fullKey = makeEtcdKey(p.key, int64(lease))
	}
	parsed, err := Parse(p.key, p.fullKey, Endpoint{Addr: p.value})
	if err != nil {
		return clientv3.NoLease, err
	}
	_, err = client.Put(client.Ctx(), p.fullKey, parsed, clientv3.WithLease(lease))

	return lease, err
}
func makeEtcdKey(key string, id int64) string {
	return fmt.Sprintf("%s%c%d", key, Delimiter, id)
}

type EtcdConf struct {
	Hosts []string `mapstructure:"hosts" json:"hosts,omitempty"`
	Key   string   `mapstructure:"key" json:"key,omitempty"`
	ID    int64    `mapstructure:"id" json:"id,omitempty"`
	User  string   `mapstructure:"user" json:"user,omitempty"`
	Pass  string   `mapstructure:"pass" json:"pass,omitempty"`
}

// HasAccount returns if account provided.
func (c EtcdConf) HasAccount() bool {
	return len(c.User) > 0 && len(c.Pass) > 0
}

// HasID returns if ID provided.
func (c EtcdConf) HasID() bool {
	return c.ID > 0
}

// Validate validates c.
func (c EtcdConf) Validate() error {
	if len(c.Hosts) == 0 {
		return errEmptyEtcdHosts
	} else if len(c.Key) == 0 {
		return errEmptyEtcdKey
	} else {
		return nil
	}
}
